/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package core.net;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * The class represents a cluster of computer with a tree hierarchical network
 * topology. For example, a cluster may be consists of many data centers filled
 * with racks of computers. In a network topology, leaves represent data nodes
 * (computers) and inner nodes represent switches/routers that manage traffic
 * in/out of data centers or racks.
 * 
 */
public class NetworkTopology {
	public final static String DEFAULT_RACK = "/default-rack";
	public final static int DEFAULT_HOST_LEVEL = 2;
	public static final Log LOG = LogFactory.getLog(NetworkTopology.class);

	/*
	 * Inner Node represent a switch/router of a data center or rack. Different
	 * from a leave node, it has non-null children.
	 */
	/* archmage docs:
	 * For archmage, InnerNode could also represent a vSwitch in a hypervisor. 
	 * TODO:
	 * Not sure if new behaviors should be added, leave if for future design
	 * */
	private class InnerNode extends HadoopNodeBase {
		private ArrayList<HadoopNode> children = new ArrayList<HadoopNode>();
		private int numOfLeaves;

		/** Construct an InnerNode from a path-like string */
		InnerNode(String path) {
			super(path);
		}

		/** Construct an InnerNode from its name and its network location */
		InnerNode(String name, String location) {
			super(name, location);
		}

		/**
		 * Construct an InnerNode from its name, its network location, its
		 * parent, and its level
		 */
		InnerNode(String name, String location, InnerNode parent, int level) {
			super(name, location, parent, level);
		}

		/** Get its children */
		Collection<HadoopNode> getChildren() {
			return children;
		}

		/** Return the number of children this node has */
		int getNumOfChildren() {
			return children.size();
		}

		/**
		 * Judge if this node represents a rack Return true if it has no child
		 * or its children are not InnerNodes
		 */
		boolean isRack() {
			if (children.isEmpty()) {
				return true;
			}

			HadoopNode firstChild = children.get(0);
			if (firstChild instanceof InnerNode) {
				return false;
			}

			return true;
		}

		/**
		 * Judge if this node is an ancestor of node <i>n</i>
		 * 
		 * @param n
		 *            a node
		 * @return true if this node is an ancestor of <i>n</i>
		 */
		boolean isAncestor(HadoopNode n) {
			return getPath(this).equals(HadoopNodeBase.PATH_SEPARATOR_STR)
					|| (n.getNetworkLocation() + HadoopNodeBase.PATH_SEPARATOR_STR)
							.startsWith(getPath(this)
									+ HadoopNodeBase.PATH_SEPARATOR_STR);
		}

		/**
		 * Judge if this node is the parent of node <i>n</i>
		 * 
		 * @param n
		 *            a node
		 * @return true if this node is the parent of <i>n</i>
		 */
		boolean isParent(HadoopNode n) {
			return n.getNetworkLocation().equals(getPath(this));
		}

		/* Return a child name of this node who is an ancestor of node <i>n</i> */
		private String getNextAncestorName(HadoopNode n) {
			if (!isAncestor(n)) {
				throw new IllegalArgumentException(this
						+ "is not an ancestor of " + n);
			}
			String name = n.getNetworkLocation().substring(
					getPath(this).length());
			if (name.charAt(0) == PATH_SEPARATOR) {
				name = name.substring(1);
			}
			int index = name.indexOf(PATH_SEPARATOR);
			if (index != -1)
				name = name.substring(0, index);
			return name;
		}

		/**
		 * Add node <i>n</i> to the subtree of this node
		 * 
		 * @param n
		 *            node to be added
		 * @return true if the node is added; false otherwise
		 */
		boolean add(HadoopNode n) {
			if (!isAncestor(n))
				throw new IllegalArgumentException(n.getName()
						+ ", which is located at " + n.getNetworkLocation()
						+ ", is not a decendent of " + getPath(this));
			if (isParent(n)) {
				// this node is the parent of n; add n directly
				n.setParent(this);
				n.setLevel(this.level + 1);
				for (int i = 0; i < children.size(); i++) {
					if (children.get(i).getName().equals(n.getName())) {
						children.set(i, n);
						return false;
					}
				}
				children.add(n);
				numOfLeaves++;
				return true;
			} else {
				// find the next ancestor node
				String parentName = getNextAncestorName(n);
				InnerNode parentNode = null;
				for (int i = 0; i < children.size(); i++) {
					if (children.get(i).getName().equals(parentName)) {
						parentNode = (InnerNode) children.get(i);
						break;
					}
				}
				if (parentNode == null) {
					// create a new InnerNode
					parentNode = new InnerNode(parentName, getPath(this), this,
							this.getLevel() + 1);
					children.add(parentNode);
				}
				// add n to the subtree of the next ancestor node
				if (parentNode.add(n)) {
					numOfLeaves++;
					return true;
				} else {
					return false;
				}
			}
		}

		/**
		 * Remove node <i>n</i> from the subtree of this node
		 * 
		 * @param n
		 *            node to be deleted
		 * @return true if the node is deleted; false otherwise
		 */
		boolean remove(HadoopNode n) {
			String parent = n.getNetworkLocation();
			String currentPath = getPath(this);
			if (!isAncestor(n))
				throw new IllegalArgumentException(n.getName()
						+ ", which is located at " + parent
						+ ", is not a descendent of " + currentPath);
			if (isParent(n)) {
				// this node is the parent of n; remove n directly
				for (int i = 0; i < children.size(); i++) {
					if (children.get(i).getName().equals(n.getName())) {
						children.remove(i);
						numOfLeaves--;
						n.setParent(null);
						return true;
					}
				}
				return false;
			} else {
				// find the next ancestor node: the parent node
				String parentName = getNextAncestorName(n);
				InnerNode parentNode = null;
				int i;
				for (i = 0; i < children.size(); i++) {
					if (children.get(i).getName().equals(parentName)) {
						parentNode = (InnerNode) children.get(i);
						break;
					}
				}
				if (parentNode == null) {
					return false;
				}
				// remove n from the parent node
				boolean isRemoved = parentNode.remove(n);
				// if the parent node has no children, remove the parent node
				// too
				if (isRemoved) {
					if (parentNode.getNumOfChildren() == 0) {
						children.remove(i);
					}
					numOfLeaves--;
				}
				return isRemoved;
			}
		} // end of remove

		/** Given a node's string representation, return a reference to the node */
		private HadoopNode getLoc(String loc) {
			if (loc == null || loc.length() == 0)
				return this;

			String[] path = loc.split(PATH_SEPARATOR_STR, 2);
			HadoopNode childnode = null;
			for (int i = 0; i < children.size(); i++) {
				if (children.get(i).getName().equals(path[0])) {
					childnode = children.get(i);
				}
			}
			if (childnode == null)
				return null; // non-existing node
			if (path.length == 1)
				return childnode;
			if (childnode instanceof InnerNode) {
				return ((InnerNode) childnode).getLoc(path[1]);
			} else {
				return null;
			}
		}

		/**
		 * get <i>leafIndex</i> leaf of this subtree if it is not in the
		 * <i>excludedNode</i>
		 */
		private HadoopNode getLeaf(int leafIndex, HadoopNode excludedNode) {
			int count = 0;
			// check if the excluded node a leaf
			boolean isLeaf = excludedNode == null
					|| !(excludedNode instanceof InnerNode);
			// calculate the total number of excluded leaf nodes
			int numOfExcludedLeaves = isLeaf ? 1 : ((InnerNode) excludedNode)
					.getNumOfLeaves();
			if (isRack()) { // children are leaves
				if (isLeaf) { // excluded node is a leaf node
					int excludedIndex = children.indexOf(excludedNode);
					if (excludedIndex != -1 && leafIndex >= 0) {
						// excluded node is one of the children so adjust the
						// leaf index
						leafIndex = leafIndex >= excludedIndex ? leafIndex + 1
								: leafIndex;
					}
				}
				// range check
				if (leafIndex < 0 || leafIndex >= this.getNumOfChildren()) {
					return null;
				}
				return children.get(leafIndex);
			} else {
				for (int i = 0; i < children.size(); i++) {
					InnerNode child = (InnerNode) children.get(i);
					if (excludedNode == null || excludedNode != child) {
						// not the excludedNode
						int numOfLeaves = child.getNumOfLeaves();
						if (excludedNode != null
								&& child.isAncestor(excludedNode)) {
							numOfLeaves -= numOfExcludedLeaves;
						}
						if (count + numOfLeaves > leafIndex) {
							// the leaf is in the child subtree
							return child.getLeaf(leafIndex - count,
									excludedNode);
						} else {
							// go to the next child
							count = count + numOfLeaves;
						}
					} else { // it is the excluededNode
						// skip it and set the excludedNode to be null
						excludedNode = null;
					}
				}
				return null;
			}
		}

		int getNumOfLeaves() {
			return numOfLeaves;
		}
	} // end of InnerNode

	InnerNode clusterMap = new InnerNode(InnerNode.ROOT); // the root
	private int numOfRacks = 0; // rack counter
	private ReadWriteLock netlock;

	public NetworkTopology() {
		netlock = new ReentrantReadWriteLock();
	}

	/**
	 * Add a leaf node Update node counter & rack counter if neccessary
	 * 
	 * @param node
	 *            node to be added
	 * @exception IllegalArgumentException
	 *                if add a node to a leave or node to be added is not a leaf
	 */
	public void add(HadoopNode node) {
		if (node == null)
			return;
		if (node instanceof InnerNode) {
			throw new IllegalArgumentException(
					"Not allow to add an inner node: " + HadoopNodeBase.getPath(node));
		}
		netlock.writeLock().lock();
		try {
			HadoopNode rack = getNode(node.getNetworkLocation());
			if (rack != null && !(rack instanceof InnerNode)) {
				throw new IllegalArgumentException("Unexpected data node "
						+ node.toString() + " at an illegal network location");
			}
			if (clusterMap.add(node)) {
				LOG.info("Adding a new node: " + HadoopNodeBase.getPath(node));
				if (rack == null) {
					numOfRacks++;
				}
			}
			LOG.debug("NetworkTopology became:\n" + this.toString());
		} finally {
			netlock.writeLock().unlock();
		}
	}

	/**
	 * Remove a node Update node counter & rack counter if neccessary
	 * 
	 * @param node
	 *            node to be removed
	 */
	public void remove(HadoopNode node) {
		if (node == null)
			return;
		if (node instanceof InnerNode) {
			throw new IllegalArgumentException(
					"Not allow to remove an inner node: "
							+ HadoopNodeBase.getPath(node));
		}
		LOG.info("Removing a node: " + HadoopNodeBase.getPath(node));
		netlock.writeLock().lock();
		try {
			if (clusterMap.remove(node)) {
				InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
				if (rack == null) {
					numOfRacks--;
				}
			}
			LOG.debug("NetworkTopology became:\n" + this.toString());
		} finally {
			netlock.writeLock().unlock();
		}
	}

	/**
	 * Check if the tree contains node <i>node</i>
	 * 
	 * @param node
	 *            a node
	 * @return true if <i>node</i> is already in the tree; false otherwise
	 */
	public boolean contains(HadoopNode node) {
		if (node == null)
			return false;
		netlock.readLock().lock();
		try {
			HadoopNode parent = node.getParent();
			for (int level = node.getLevel(); parent != null && level > 0; parent = parent
					.getParent(), level--) {
				if (parent == clusterMap)
					return true;
			}
		} finally {
			netlock.readLock().unlock();
		}
		return false;
	}

	/**
	 * Given a string representation of a node, return its reference
	 * 
	 * @param loc
	 *            a path-like string representation of a node
	 * @return a reference to the node; null if the node is not in the tree
	 */
	public HadoopNode getNode(String loc) {
		netlock.readLock().lock();
		try {
			loc = HadoopNodeBase.normalize(loc);
			if (!HadoopNodeBase.ROOT.equals(loc))
				loc = loc.substring(1);
			return clusterMap.getLoc(loc);
		} finally {
			netlock.readLock().unlock();
		}
	}

	/** Return the total number of racks */
	public int getNumOfRacks() {
		netlock.readLock().lock();
		try {
			return numOfRacks;
		} finally {
			netlock.readLock().unlock();
		}
	}

	/** Return the total number of nodes */
	public int getNumOfLeaves() {
		netlock.readLock().lock();
		try {
			return clusterMap.getNumOfLeaves();
		} finally {
			netlock.readLock().unlock();
		}
	}

	/**
	 * Return the distance between two nodes It is assumed that the distance
	 * from one node to its parent is 1 The distance between two nodes is
	 * calculated by summing up their distances to their closest common
	 * ancestor.
	 * 
	 * @param node1
	 *            one node
	 * @param node2
	 *            another node
	 * @return the distance between node1 and node2 node1 or node2 do not belong
	 *         to the cluster
	 */
	public int getDistance(HadoopNode node1, HadoopNode node2) {
		if (node1 == node2) {
			return 0;
		}
		HadoopNode n1 = node1, n2 = node2;
		int dis = 0;
		netlock.readLock().lock();
		try {
			int level1 = node1.getLevel(), level2 = node2.getLevel();
			while (n1 != null && level1 > level2) {
				n1 = n1.getParent();
				level1--;
				dis++;
			}
			while (n2 != null && level2 > level1) {
				n2 = n2.getParent();
				level2--;
				dis++;
			}
			while (n1 != null && n2 != null && n1.getParent() != n2.getParent()) {
				n1 = n1.getParent();
				n2 = n2.getParent();
				dis += 2;
			}
		} finally {
			netlock.readLock().unlock();
		}
		if (n1 == null) {
			LOG.warn("The cluster does not contain node: "
					+ HadoopNodeBase.getPath(node1));
			return Integer.MAX_VALUE;
		}
		if (n2 == null) {
			LOG.warn("The cluster does not contain node: "
					+ HadoopNodeBase.getPath(node2));
			return Integer.MAX_VALUE;
		}
		return dis + 2;
	}

	/**
	 * Check if two nodes are on the same rack
	 * 
	 * @param node1
	 *            one node
	 * @param node2
	 *            another node
	 * @return true if node1 and node2 are pm the same rack; false otherwise
	 * @exception IllegalArgumentException
	 *                when either node1 or node2 is null, or node1 or node2 do
	 *                not belong to the cluster
	 */
	public boolean isOnSameRack(HadoopNode node1, HadoopNode node2) {
		if (node1 == null || node2 == null) {
			return false;
		}

		netlock.readLock().lock();
		try {
			return node1.getParent() == node2.getParent();
		} finally {
			netlock.readLock().unlock();
		}
	}

	final private static Random r = new Random();

	/**
	 * randomly choose one node from <i>scope</i> if scope starts with ~, choose
	 * one from the all nodes except for the ones in <i>scope</i>; otherwise,
	 * choose one from <i>scope</i>
	 * 
	 * @param scope
	 *            range of nodes from which a node will be choosen
	 * @return the choosen node
	 */
	public HadoopNode chooseRandom(String scope) {
		netlock.readLock().lock();
		try {
			if (scope.startsWith("~")) {
				return chooseRandom(HadoopNodeBase.ROOT, scope.substring(1));
			} else {
				return chooseRandom(scope, null);
			}
		} finally {
			netlock.readLock().unlock();
		}
	}

	private HadoopNode chooseRandom(String scope, String excludedScope) {
		if (excludedScope != null) {
			if (scope.startsWith(excludedScope)) {
				return null;
			}
			if (!excludedScope.startsWith(scope)) {
				excludedScope = null;
			}
		}
		HadoopNode node = getNode(scope);
		if (!(node instanceof InnerNode)) {
			return node;
		}
		InnerNode innerNode = (InnerNode) node;
		int numOfDatanodes = innerNode.getNumOfLeaves();
		if (excludedScope == null) {
			node = null;
		} else {
			node = getNode(excludedScope);
			if (!(node instanceof InnerNode)) {
				numOfDatanodes -= 1;
			} else {
				numOfDatanodes -= ((InnerNode) node).getNumOfLeaves();
			}
		}
		int leaveIndex = r.nextInt(numOfDatanodes);
		return innerNode.getLeaf(leaveIndex, node);
	}

	/**
	 * return the number of leaves in <i>scope</i> but not in
	 * <i>excludedNodes</i> if scope starts with ~, return the number of nodes
	 * that are not in <i>scope</i> and <i>excludedNodes</i>;
	 * 
	 * @param scope
	 *            a path string that may start with ~
	 * @param excludedNodes
	 *            a list of nodes
	 * @return number of available nodes
	 */
	public int countNumOfAvailableNodes(String scope, List<HadoopNode> excludedNodes) {
		boolean isExcluded = false;
		if (scope.startsWith("~")) {
			isExcluded = true;
			scope = scope.substring(1);
		}
		scope = HadoopNodeBase.normalize(scope);
		int count = 0; // the number of nodes in both scope & excludedNodes
		netlock.readLock().lock();
		try {
			for (HadoopNode node : excludedNodes) {
				if ((HadoopNodeBase.getPath(node) + HadoopNodeBase.PATH_SEPARATOR_STR)
						.startsWith(scope + HadoopNodeBase.PATH_SEPARATOR_STR)) {
					count++;
				}
			}
			HadoopNode n = getNode(scope);
			int scopeNodeCount = 1;
			if (n instanceof InnerNode) {
				scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
			}
			if (isExcluded) {
				return clusterMap.getNumOfLeaves() - scopeNodeCount
						- excludedNodes.size() + count;
			} else {
				return scopeNodeCount - count;
			}
		} finally {
			netlock.readLock().unlock();
		}
	}

	/** convert a network tree to a string */
	public String toString() {
		// print the number of racks
		StringBuffer tree = new StringBuffer();
		tree.append("Number of racks: ");
		tree.append(numOfRacks);
		tree.append("\n");
		// print the number of leaves
		int numOfLeaves = getNumOfLeaves();
		tree.append("Expected number of leaves:");
		tree.append(numOfLeaves);
		tree.append("\n");
		// print nodes
		for (int i = 0; i < numOfLeaves; i++) {
			tree.append(HadoopNodeBase.getPath(clusterMap.getLeaf(i, null)));
			tree.append("\n");
		}
		return tree.toString();
	}

	/* swap two array items */
	static private void swap(HadoopNode[] nodes, int i, int j) {
		HadoopNode tempNode;
		tempNode = nodes[j];
		nodes[j] = nodes[i];
		nodes[i] = tempNode;

	}

	/**
	 * Sort nodes array by their distances to <i>reader</i> It linearly scans
	 * the array, if a local node is found, swap it with the first element of
	 * the array. If a local rack node is found, swap it with the first element
	 * following the local node. If neither local node or local rack node is
	 * found, put a random replica location at postion 0. It leaves the rest
	 * nodes untouched.
	 */
	public void pseudoSortByDistance(HadoopNode reader, HadoopNode[] nodes) {
		int tempIndex = 0;
		if (reader != null) {
			int localRackNode = -1;
			// scan the array to find the local node & local rack node
			for (int i = 0; i < nodes.length; i++) {
				if (tempIndex == 0 && reader == nodes[i]) { // local node
					// swap the local node and the node at position 0
					if (i != 0) {
						swap(nodes, tempIndex, i);
					}
					tempIndex = 1;
					if (localRackNode != -1) {
						if (localRackNode == 0) {
							localRackNode = i;
						}
						break;
					}
				} else if (localRackNode == -1
						&& isOnSameRack(reader, nodes[i])) {
					// local rack
					localRackNode = i;
					if (tempIndex != 0)
						break;
				}
			}

			// swap the local rack node and the node at position tempIndex
			if (localRackNode != -1 && localRackNode != tempIndex) {
				swap(nodes, tempIndex, localRackNode);
				tempIndex++;
			}
		}

		// put a random node at position 0 if it is not a local/local-rack node
		if (tempIndex == 0 && nodes.length != 0) {
			swap(nodes, 0, r.nextInt(nodes.length));
		}
	}
}
